on_data_available 콜백(Callback) 함수의 활용
DDS(Data Distribution Service) 미들웨어를 활용한 분산 시스템, 특히 초저지연(Ultra-Low Latency)이 요구되는 알고리즘 트레이딩 시스템을 구축함에 있어 데이터 수신부의 설계는 전체 성능을 좌우하는 가장 결정적인 요소다. on_data_available 콜백 함수는 단순한 데이터 수신 알림 이상의 의미를 지닌다. 이는 미들웨어의 내부 네트워크 스레드와 애플리케이션의 사용자 공간(User Space)이 만나는 접점이자, 메모리 관리 전략과 스레딩 모델이 충돌하고 융합되는 지점이다. 본 장에서는 OMG(Object Management Group) 표준 명세에 기반한 on_data_available의 정확한 작동 원리, 고성능 처리를 위한 아키텍처 패턴, 그리고 한국의 시스템 트레이딩 개발자들에게 친숙한 Kiwoom Open API 모델과의 비교 분석을 통해, 실전적이고 견고한 데이터 수신 전략을 수립하는 방법을 해설한다.
1. DDS 리스너(Listener) 아키텍처와 트리거 메커니즘
on_data_available을 올바르게 이해하기 위해서는 먼저 DDS가 정의하는 비동기 통지(Asynchronous Notification) 체계인 리스너 아키텍처를 거시적으로 조망해야 한다. DDS 미들웨어는 데이터 중심(Data-Centric) 통신을 지향하며, 이는 데이터의 값이 변하거나 상태가 변하는 사건(Event)을 중심으로 시스템이 구동됨을 의미한다.
1.1 계층적 리스너 구조와 우선순위
DDS 엔티티(Entity)들은 상호 계층적인 관계를 맺고 있으며, 이는 이벤트 통지 시스템에도 그대로 적용된다. on_data_available은 DataReaderListener 인터페이스에 정의된 가상 함수이지만, 데이터가 도착했다고 해서 무조건 이 함수가 즉시 호출되는 것은 아니다. RTI Connext DDS 및 OMG 표준 명세에 따르면, 데이터 수신 상태(Read Communication Status)가 변경될 때 미들웨어는 다음의 순서로 처리자를 탐색한다.1
- DDS.SubscriberListener.on_data_on_readers: 미들웨어는 가장 먼저 해당
DataReader를 소유한 상위Subscriber엔티티에 리스너가 장착되어 있는지, 그리고 그 리스너가on_data_on_readers콜백을 활성화(StatusMask설정)했는지 확인한다. - DDS.DataReaderListener.on_data_available: 만약 상위
Subscriber레벨에서 처리가 이루어지지 않는다면(리스너가 없거나, 해당 콜백이 비활성화된 경우), 그제야 개별DataReader에 부착된on_data_available이 호출된다.
이러한 계층 구조가 시사하는 바는 크다. 만약 개발자가 선물 옵션 데이터와 현물 데이터 간의 동기화가 필요한 차익거래(Arbitrage) 전략을 구사한다면, 개별 on_data_available보다는 상위 SubscriberListener를 통해 여러 DataReader의 이벤트를 한곳에서 제어하는 것이 논리적으로 타당할 수 있다. 반면, 각 종목(Symbol) 별로 독립적인 전략이 수행되는 구조라면 DataReaderListener를 사용하는 것이 모듈화 관점에서 유리하다.1
1.2 상태 플래그(Status Changed Flag)와 호출 조건
on_data_available은 데이터가 수신 큐(Receive Queue)에 도착할 때마다 호출되는 것이 원칙이지만, 엄밀하게는 “데이터 수신 상태 플래그(StatusChangedFlag)가 FALSE에서 TRUE로 변경될 때” 트리거된다.2 이는 고속으로 데이터가 유입되는 상황에서 미묘한 차이를 만들어낸다.
- 엣지 트리거(Edge-Triggered) 성격: 리스너는 상태의 변화(Change)에 반응한다. 만약 애플리케이션이
on_data_available내에서 도착한 데이터를 모두 처리하지 않고 남겨둔 채 리턴한다면, 미들웨어에 따라서는 추가적인 데이터 도착이 없을 경우 다시 콜백을 호출하지 않을 수 있다. - 재진입 방지: 일반적으로 DDS 구현체들은 리스너의 중복 호출을 방지하기 위해 내부적인 락(Lock)이나 상태 관리를 수행한다. 따라서 콜백 내에서 모든 데이터를 소진(
take또는read를 통해)하는 루프 구조가 필수적으로 요구된다.4
2. 스레딩 모델(Threading Model)과 동시성 위험
on_data_available의 구현에서 개발자들이 가장 빈번하게 범하는 실수는 이 함수가 실행되는 **스레드의 정체(Identity)**를 간과하는 것이다. 이는 단순한 함수 호출이 아니라, 미들웨어의 심장부로 들어가는 문이다.
2.1 미들웨어 내부 수신 스레드(Internal Receive Thread)
대부분의 상용 DDS 구현체(RTI Connext, Fast DDS 등)에서 on_data_available은 애플리케이션이 생성한 스레드가 아닌, 미들웨어가 관리하는 **내부 수신 스레드(Receive Thread)**에 의해 호출된다.6 이 스레드는 네트워크 소켓(UDP/TCP)으로부터 패킷을 읽어(recv), RTPS(Real-Time Publish Subscribe) 프로토콜을 해석하고, 역직렬화(Deserialization)를 수행하여 DataReader의 캐시에 데이터를 저장하는 중차대한 임무를 맡고 있다.
이 사실이 함의하는 바는 명확하다. “리스너 콜백 내에서의 지연은 곧 네트워크 수신 버퍼의 정체를 의미한다.”
2.2 블로킹(Blocking)의 치명적 결과
만약 on_data_available 내부에서 다음과 같은 작업이 수행된다면, 시스템은 심각한 성능 저하 또는 장애에 직면하게 된다.
- 과도한 연산: 복잡한 파생변수 계산이나 시뮬레이션.
- I/O 작업: 디스크 로깅, 데이터베이스 접근, 화면 UI 업데이트.
- 락(Lock) 대기: 다른 스레드가 점유 중인 Mutex 획득 시도.
이러한 블로킹이 발생했을 때 나타나는 연쇄 반응은 다음과 같다.7
| 현상 | 메커니즘 | 결과 |
|---|---|---|
| 소켓 버퍼 오버플로 | 수신 스레드가 콜백에 묶여 소켓의 recv를 호출하지 못함 | 운영체제 커널 레벨에서 UDP 패킷 드롭 발생. 데이터 유실. |
| NACK 폭풍 (NACK Storm) | 신뢰성(Reliability) QoS가 켜져 있을 경우, 드롭된 패킷에 대한 재전송 요청(NACK)이 급증 | 네트워크 대역폭 포화 및 재전송 처리에 의한 추가적인 지연 발생(Latency Spike). |
| 공유 자원 기아 (Starvation) | 통상적으로 하나의 도메인 참여자(Participant) 내에서 여러 DataReader가 수신 스레드 풀을 공유함 | 하나의 토픽 처리가 지연되면, 전혀 관계없는 다른 토픽의 데이터 수신까지 멈춤. |
따라서 DDS 프로그래밍의 절대적인 제1원칙은 **“리스너는 절대 블로킹되어서는 안 된다(Never Block in a Listener Callback)”**이다.7
3. 데이터 접근 패턴: Read vs Take와 Zero-Copy
콜백 함수 내부에서의 핵심 로직은 미들웨어 캐시로부터 데이터를 애플리케이션 영역으로 가져오는 것이다. DDS는 이를 위해 read()와 take()라는 두 가지 접근 방식을 제공하며, 메모리 복사 비용을 제거하는 Zero-Copy 메커니즘을 지원한다.
3.1 Read()와 Take()의 의미론적 차이
- read(): 미들웨어 캐시에 있는 데이터의 복사본(혹은 뷰)을 가져오되, 캐시에서 데이터를 제거하지 않는다. 동일한 데이터를 여러 번 읽어야 하거나, 히스토리 관리가 필요한 경우 사용한다.4
- take(): 데이터를 가져옴과 동시에 미들웨어 캐시에서 해당 샘플을 제거(De-queue)한다. 트레이딩 시스템과 같이 최신 시세가 중요하고, 한번 처리한 데이터는 다시 볼 필요가 없는 스트리밍 처리에 적합하다. 메모리 자원을 즉시 회수할 수 있어 고성능 처리에 유리하다.4
3.2 Zero-Copy와 LoanedSamples
C++와 같은 시스템 언어에서 성능 최적화의 핵심은 메모리 복사(Memcpy)를 줄이는 것이다. DDS는 LoanedSamples라는 개념을 통해 이를 구현한다.
// Modern C++ (C++11 이상) 예시
void on_data_available(dds::sub::DataReader<MarketData>& reader) {
// take() 호출 시 인자를 주지 않거나 빈 시퀀스를 주면 Zero-Copy 모드로 동작
dds::sub::LoanedSamples<MarketData> samples = reader.take();
for (const auto& sample : samples) {
if (sample.info().valid()) {
// sample.data()는 미들웨어 내부 버퍼를 직접 가리키는 참조임
process_tick(sample.data());
}
}
// 루프를 벗어나 samples 객체가 소멸될 때 자동으로 return_loan()이 호출됨
}
4
이 코드에서 sample.data()는 애플리케이션의 메모리에 복사된 데이터가 아니라, 미들웨어가 관리하는 수신 버퍼 내의 메모리 주소를 직접 가리킨다. 이는 수 기가바이트의 데이터를 처리할 때 엄청난 성능 이득을 제공하지만, **데이터의 소유권(Ownership)**이 여전히 미들웨어에 있다는 점을 명심해야 한다. samples 객체가 소멸되어 대여(Loan)가 반환(return_loan)된 이후에 해당 데이터의 주소에 접근하면 Use-After-Free 오류가 발생한다.
4. Kiwoom Open API vs DDS: 이벤트 핸들링 패러다임의 전환
한국의 시스템 트레이딩 환경에서 가장 널리 사용되는 Kiwoom Open API의 이벤트 모델과 DDS의 모델을 비교하는 것은, 기존 개발자들이 DDS 환경으로 전환할 때 겪는 인지 부조화를 해소하는 데 매우 유용하다.
4.1 Kiwoom Open API: 윈도우 메시지 기반의 싱글 스레드 모델
Kiwoom Open API는 OCX(OLE Control Extension) 방식을 사용하며, 윈도우 운영체제의 메시지 펌프(Message Pump)에 의존한다. 데이터 수신 이벤트인 OnReceiveTrData는 다음과 같은 특징을 가진다.13
- 순차적 처리 (Serialization): 모든 이벤트는 메인 UI 스레드(STA: Single Threaded Apartment) 큐에 쌓여 하나씩 순서대로 처리된다. 앞선
OnReceiveTrData처리가 끝나지 않으면 다음 시세 데이터인OnReceiveRealData는 처리되지 못하고 대기한다. - 동시성 부재: 단일 스레드이므로 변수에 대한 동시 접근(Race Condition)을 걱정할 필요가 없다. 이는 개발을 단순하게 만들지만, 멀티코어 CPU의 성능을 활용하지 못하게 한다.
- 데이터 타입:
BSTR(COM 문자열) 타입을 주로 사용하여, 문자열 파싱에 상당한 오버헤드가 발생한다.13
4.2 DDS: 멀티 스레드 기반의 반응형 모델
반면 DDS의 on_data_available은 근본적으로 다른 접근 방식을 취한다.
| 비교 항목 | Kiwoom Open API (OnReceiveTrData) | DDS (on_data_available) |
|---|---|---|
| 실행 스레드 | 메인 UI 스레드 (Single Thread) | 미들웨어 내부 수신 스레드 (Multi-Thread Possible) |
| 동시성 | 없음 (순차 실행 보장) | 높음 (동시 실행 가능성 있음, 락 필요) |
| 블로킹 영향 | UI가 멈춤 (“렉” 발생), 후속 데이터 지연 | 패킷 손실, 네트워크 재전송 폭주, 전체 시스템 불안정 |
| 데이터 전달 | 문자열(BSTR) 기반 파싱 필요 | 바이너리 구조체(Struct) 기반 Zero-Copy |
| 메모리 관리 | OS 및 API가 관리 | 개발자가 take/return_loan으로 직접 관리 |
4
Kiwoom API에 익숙한 개발자가 DDS로 넘어올 때 가장 주의해야 할 점은 **“데이터가 순서대로 얌전하게 기다려주지 않는다”**는 것이다. DDS에서는 데이터가 폭포수처럼 쏟아지며, 이를 받아내는 그릇(Listener)이 튼튼하고 빠르지 않으면 시스템은 붕괴한다.
5. 아키텍처 패턴: 스레드 안전 큐를 활용한 생산자-소비자 패턴
앞서 언급한 “리스너 내 블로킹 금지” 원칙과 “데이터 처리의 복잡성“이라는 상충되는 요구사항을 해결하기 위한 표준적인 해법은 생산자-소비자(Producer-Consumer) 패턴을 적용하는 것이다. 리스너는 데이터를 수집하여 큐(Queue)에 넣는 생산자 역할만 수행하고, 실제 무거운 처리는 별도의 워커 스레드(Worker Thread)가 소비자가 되어 수행한다.14
5.1 C++11 표준을 활용한 스레드 안전 큐 구현
표준 STL의 std::queue는 스레드 안전하지 않으므로, mutex와 condition_variable을 사용하여 직접 래핑(Wrapping)해야 한다. 다음은 DDS 리스너와 결합하기 위한 최적화된 큐의 구현 예시다.14
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class ThreadSafeQueue {
private:
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable cond_;
public:
// 데이터를 큐에 추가 (Producer: Listener에서 호출)
void push(T value) {
{
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::move(value));
}
cond_.notify_one(); // 대기 중인 워커 스레드를 깨움
}
// 데이터를 큐에서 꺼냄 (Consumer: Worker Thread에서 호출)
// 큐가 비어있으면 데이터가 들어올 때까지 대기(Blocking)
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this]{ return!queue_.empty(); });
value = std::move(queue_.front());
queue_.pop();
}
// 넌블로킹 방식의 pop (옵션)
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty()) return false;
value = std::move(queue_.front());
queue_.pop();
return true;
}
};
5.2 리스너와 워커 스레드의 결합
이제 이 큐를 사용하여 on_data_available을 구현한다. 핵심은 리스너 내부에서 데이터를 처리하지 않고, 가장 빠른 방법(복사 또는 이동)으로 큐에 밀어 넣고 즉시 리턴하는 것이다.
class MarketDataListener : public dds::sub::NoOpDataReaderListener<MarketData> {
private:
ThreadSafeQueue<MarketData>& queue_;
public:
explicit MarketDataListener(ThreadSafeQueue<MarketData>& q) : queue_(q) {}
void on_data_available(dds::sub::DataReader<MarketData>& reader) override {
// 1. Zero-Copy로 데이터 접근
dds::sub::LoanedSamples<MarketData> samples = reader.take();
// 2. 유효한 데이터만 큐로 전송
for (const auto& sample : samples) {
if (sample.info().valid()) {
// 주의: LoanedSamples의 메모리는 리턴해야 하므로,
// 데이터를 큐에 넣을 때는 복사(Copy)가 일어남.
// MarketData 구조체가 크다면 여기서 병목이 될 수 있으므로
// 필요한 필드만 추출하거나 별도의 메모리 풀 전략이 필요함.
queue_.push(sample.data());
}
}
} // 3. 함수 종료 시 Loan 반환
};
이 패턴을 사용하면 미들웨어의 수신 스레드는 큐에 데이터를 넣는(Push) 아주 짧은 시간 동안만 락을 획득하고 즉시 해제되므로, 네트워크 버퍼 오버플로 문제를 효과적으로 예방할 수 있다.
6. 고급 활용: StatusCondition과 WaitSet의 전략적 사용
모든 상황에서 리스너가 정답은 아니다. 때로는 리스너의 비동기성이 오히려 시스템의 복잡도를 높일 수 있다. 특히 여러 조건이 동시에 만족되었을 때만 동작해야 하는 복합 이벤트 처리(Complex Event Processing)나, 엄격한 실시간성보다는 처리량(Throughput)이 중요한 배치 처리의 경우 WaitSet 모델이 더 적합할 수 있다.6
6.1 WaitSet vs Listener 비교
| 특성 | 리스너 (Listener) | 웨이트셋 (WaitSet) |
|---|---|---|
| 패러다임 | Push (이벤트 주도) | Pull (폴링 또는 대기) |
| 지연 시간 (Latency) | 최상 (컨텍스트 스위칭 최소화) | 보통 (스레드 깨어남 비용 발생) |
| 코드 흐름 | 콜백 분산으로 파편화될 수 있음 | 순차적 흐름 제어 용이 |
| 스레드 제어 | 미들웨어가 제어 (위험) | 애플리케이션이 제어 (안전) |
| 추천 시나리오 | 단순 데이터 수신, HFT 시세 처리 | 복잡한 상관관계 분석, UI 연동 |
18
6.2 하이브리드 접근법
최근의 모던 C++ DDS 애플리케이션들은 이 두 가지를 혼합하여 사용하기도 한다. 예를 들어, 시세 데이터와 같이 빈도가 높고 지연에 민감한 데이터는 on_data_available 리스너를 통해 처리하고, 계좌 잔고 확인이나 시스템 상태 모니터링과 같이 빈도가 낮고 동기화가 필요한 작업은 WaitSet을 사용하는 식이다.
7. 구현 시 주의사항 및 트러블슈팅
on_data_available을 구현할 때 자주 발생하는 문제와 그 해결책을 정리한다.
7.1 예외 처리 (Exception Handling)
C++ 환경에서 리스너 콜백 내부에서 예외(Exception)가 발생하여 함수 밖으로 던져지면(Throw), 미들웨어 스레드가 비정상 종료되거나 정의되지 않은 동작(Undefined Behavior)을 유발할 수 있다. 따라서 반드시 noexcept 수준의 예외 처리를 해야 한다.
void on_data_available(...) {
try {
// 데이터 처리 로직
} catch (const std::exception& e) {
// 로그만 남기고 절대 예외를 다시 던지지 말 것
std::cerr << "Exception in listener: " << e.what() << std::endl;
} catch (...) {
std::cerr << "Unknown exception in listener" << std::endl;
}
}
7.2 Loan 미반환으로 인한 자원 고갈
read()나 take()를 호출할 때 LoanedSamples 객체를 사용하지 않고 저수준 API를 사용하면서 return_loan을 호출하지 않는 경우, 미들웨어의 수신 버퍼 풀(Receive Buffer Pool)이 고갈된다.11 이렇게 되면 더 이상 새로운 데이터를 수신할 수 없게 되며, 겉으로는 시스템이 멈춘 것처럼 보인다. Modern C++ API를 사용하면 RAII 패턴에 의해 이 문제가 대부분 해결되지만, 명시적 관리가 필요한 경우 각별한 주의가 필요하다.
7.3 무한 루프와 재진입
드물지만 on_data_available 내부에서 다시 동일한 토픽에 데이터를 발행(Publish)하고, 이것이 다시 루프백(Loopback)되어 수신되는 구조라면 무한 재귀 호출에 빠질 수 있다. 이를 방지하기 위해 ignore_participant QoS 등을 활용하여 자신이 보낸 데이터는 수신하지 않도록 설정해야 한다.
8. 요약
on_data_available 콜백 함수는 DDS 기반 시스템에서 데이터가 유입되는 첫 번째 관문이다. 이 함수를 작성할 때는 **“빠르게 읽고(Take), 빠르게 넘기고(Queueing), 빠르게 빠져나온다(Return)”**는 3원칙을 기억해야 한다.
- 아키텍처: 리스너는 미들웨어 스레드에서 실행되므로 절대 블로킹하지 않는다.
- 데이터 접근:
take()와 Zero-Copy(LoanedSamples)를 활용하여 메모리 오버헤드를 최소화한다. - 패턴: 스레드 안전 큐를 도입하여 수신과 처리를 분리(Decoupling)한다.
- 비교: Kiwoom API의 순차적 사고방식에서 벗어나 동시성(Concurrency)을 고려한 설계를 적용한다.
이러한 원칙들을 준수함으로써, 개발자는 수백만 건의 시세 데이터가 쏟아지는 극한의 환경에서도 굳건히 동작하는 고성능 트레이딩 시스템을 구축할 수 있을 것이다. 이는 단순한 코딩 기술을 넘어, 실시간 분산 시스템의 본질을 이해하고 제어하는 엔지니어링의 영역이다.
9. 참고 자료
- Package: DDS.Listener - RTI Community, https://community.rti.com/static/documentation/connext-dds/current/doc/api/connext_dds/api_ada/dds-listener.ads.html
- Data Distribution Service for Real-Time Systems Specification - Object Management Group (OMG), https://www.omg.org/spec/DDS/1.1/PDF/
- Data Distribution Service (DDS) - Object Management Group (OMG), https://www.omg.org/spec/DDS/1.4/PDF
- RTI Connext Modern C++ API: DataReader Use Cases, https://community.rti.com/static/documentation/connext-dds/current/doc/api/connext_dds/api_cpp2/group__DDSReaderExampleModule.html
- CoreDX DDS Modern C++ API: DataReader Use Cases - Twin Oaks Computing, Inc, https://www.twinoakscomputing.com/documents/refman_html_5.0/CoreDX_DDS_CXX_Reference_5.0/cookbook_datareader.html
- Data protection in Listener callback | Data Distribution Service (DDS) Community RTI Connext Users, https://community.rti.com/forum-topic/data-protection-listener-callback
- Never Block in a Listener Callback | Data Distribution Service (DDS) Community RTI Connext Users, https://community.rti.com/best-practices/never-block-listener-callback
- Multiple datareaderlistener in different classes | Data Distribution Service (DDS) Community RTI Connext Users, https://community.rti.com/forum-topic/multiple-datareaderlistener-different-classes
- DDS Reader not dropping messages - java - Stack Overflow, https://stackoverflow.com/questions/60537723/dds-reader-not-dropping-messages
- DDS::DataReaderListener Class Reference - Twin Oaks Computing, Inc, https://www.twinoakscomputing.com/documents/refman_html_4.0.10/CoreDX_DDS_CPP_Reference_4.0.10/classDDS_1_1DataReaderListener.html
- Loaning and Returning Data and SampleInfo Sequences - RTI Community, https://community.rti.com/static/documentation/connext-dds/current/doc/manuals/connext_dds_professional/users_manual/users_manual/Loaning_and_Returning_Data_and_SampleInf.htm
- RTI Connext Modern C++ API: dds::sub::DataReader< T > Class Template Reference, https://community.rti.com/static/documentation/connext-dds/current/doc/api/connext_dds/api_cpp2/classdds_1_1sub_1_1DataReader.html
- Kiwoom Openapi W Devguide Ver 1.0 | PDF - Scribd, https://www.scribd.com/document/352129345/Kiwoom-Openapi-w-Devguide-Ver-1-0
- Implement thread-safe queue in C++ - GeeksforGeeks, https://www.geeksforgeeks.org/dsa/implement-thread-safe-queue-in-c/
- C++11 thread-safe queue - Stack Overflow, https://stackoverflow.com/questions/15278343/c11-thread-safe-queue
- A simple thread-safe queue implementation based on std::list<> and C++11 threading primitives. - GitHub Gist, https://gist.github.com/holtgrewe/8728757
- Introduction to OpenDDS, https://opendds.org/about/articles/Article-Intro.html
- Waitsets | Data Distribution Service (DDS) Community RTI Connext Users, https://community.rti.com/examples/waitsets
- Proper use of WaitSets and Conditions | Data Distribution Service (DDS) Community RTI Connext Users, https://community.rti.com/howto/proper-use-waitsets-and-conditions
- 18.8 Listeners - RTI Community, https://community.rti.com/static/documentation/connext-dds/current/doc/manuals/connext_dds_professional/users_manual/users_manual/Listeners.htm